package com.rtsapp.server.network.protocol.rpc.server;

import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import com.rtsapp.server.network.protocol.ProtocolConstants;
import com.rtsapp.server.network.protocol.rpc.client.RPCCallback;
import com.rtsapp.server.network.protocol.rpc.codec.*;
import com.rtsapp.server.network.session.Session;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;

import com.rtsapp.server.logger.Logger;
import io.netty.channel.ChannelHandlerContext;

/**
 * RPC服务器端Handler
 *
 * @author dengyingtuo
 */
@Sharable
public class RPCServerInHandler extends ObjectInHandler implements IRPCServerProxy{

    private static final Logger LOGGER = com.rtsapp.server.logger.LoggerFactory.getLogger(ObjectInHandler.class);



    /**
     * 用于存储所有RPC服务对象的map
     * key: xml中配置的beanName
     * value: 对应的对象
     */
    private final ConcurrentMap<String, RPCServiceInfo> serviceInfos = new ConcurrentHashMap<>();

    /**
     * 所有客户端ID, 服务器
     * ClientID - List<Session>
     */
    private final ConcurrentMap<String,ConcurrentMap<Session,Session>> clientIdSessionsMap = new ConcurrentHashMap<>();

    //session的key
    private static final String SESSION_KEY_CLIENTID = "session_key_clientid";
    //触发当前调用所对应的客户端ID
    private static final ThreadLocal<String> clientIdLocal = new ThreadLocal<String>();

    // 原子ID
    private final AtomicLong sequenceNoGenerator = new AtomicLong( 0 );

    // 所有等待处理的RPCCallback
    private final ConcurrentMap<Long, RPCCallback> pushCallbacks =  new ConcurrentHashMap<Long, RPCCallback >();




    @Override
    public void processMessage(Session session, Object message) {

        if( message instanceof RPCRegister){

            RPCRegister reg = (RPCRegister)message;

            registerSession( session, reg.getClientId() );

        }else if (message instanceof RPCRequest) {

            //绑定Session对应的clientId
            Object clientId = session.getAttribute( SESSION_KEY_CLIENTID );
            if( clientId != null && clientId instanceof  String ){
                clientIdLocal.set( (String)clientId);
            }else{
                clientIdLocal.set(null);
            }


            RPCRequest request = (RPCRequest) message;
            try {

                Object result = invoke(request.getObjectName(), request.getMethodName(), request.getParameters());

                if (request.getRpcType() != ProtocolConstants.RPCConstants.RPCType.oneway) {
                    RPCResponse response = new RPCResponse();
                    response.setSequenceNo(request.getSequenceNo());
                    response.setRpcType(request.getRpcType());
                    response.setErrorNo( ProtocolConstants.RPCConstants.RPCStatus.ok );
                    response.setResult(result);

                    session.send(response);
                }

            } catch (Throwable ex) {

                ex.printStackTrace();
                LOGGER.error("远程方法出现异常",ex);

                RPCResponse response = new RPCResponse();
                response.setSequenceNo(request.getSequenceNo());
                response.setRpcType(request.getRpcType());
                response.setErrorNo( ProtocolConstants.RPCConstants.RPCStatus.exception  );
                response.setResult(ex);

                session.send(response);
            }

        } else if( message instanceof  RPCPushResponse ){
            processPushCallback( session, (RPCPushResponse)message );
        } else {
            LOGGER.error("该类型的RPC还未实现");
        }

    }



    private Object invoke(String objectName, String methodName, Object[] parameters) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {

        RPCServiceInfo serviceInfo = serviceInfos.get(objectName);

        if (serviceInfo == null) {
            Object object = this.application.getObject(objectName);
            if (object == null) {
                LOGGER.error("no object with name" + objectName + "  found");
                throw new RuntimeException("no object with name" + objectName + "  found");
            }

            serviceInfo = new RPCServiceInfo(object);
            RPCServiceInfo oldServiceInfo = serviceInfos.putIfAbsent(objectName, serviceInfo);
            if (oldServiceInfo != null) {
                serviceInfo = oldServiceInfo;
            }
        }

        return serviceInfo.invoke(methodName, parameters);
    }



    @Override
    public String getCurrentClientId(){
        return clientIdLocal.get();
    }

    @Override
    public List<String> getAllClientIds(){
        return new ArrayList<>( this.clientIdSessionsMap.keySet());
    }


    @Override
    public boolean sendPush(String clientId, Object message, RPCCallback callback ){

        //加入到session结合
        ConcurrentMap<Session,Session> sessionMap =  clientIdSessionsMap.get( clientId );

        if( sessionMap != null && sessionMap.size() > 0 ){

            List<Session> sessionList = new ArrayList<>( sessionMap.keySet() );

            if( sessionList.size() > 0 ){

                //生成消息
                long sequenceNo = sequenceNoGenerator.incrementAndGet();

                RPCPush push  = new RPCPush();
                push.setSequenceNo( sequenceNo );
                push.setMessage( message );

                //获得Session
                Session session =   sessionList.get( (int)( Math.random() * sessionList.size() ) );
                session.write( push );

                //记住回调
                pushCallbacks.put( sequenceNo, callback);

                return true;
            }
        }

        return false;
    }

    private void processPushCallback( Session session, RPCPushResponse pushResp ){
        RPCCallback callback =  pushCallbacks.remove( pushResp.getSequenceNo() );
        if( callback == null ){
            return;
        }

        try {
            if (pushResp.getErrorNo() == ProtocolConstants.RPCConstants.RPCStatus.ok) {
                callback.onCompleted(pushResp.getResult());
            } else {
                callback.onError(new RuntimeException("push处理异常:", (Throwable) pushResp.getResult()));
            }
        }catch (Throwable ex ){
            LOGGER.error( ex );
        }

    }



    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        if( sessionManager != null ) {

            Channel channel = ctx.channel();
            Session session = sessionManager.getSession(channel);

            unregisterSession( session );
        }

        super.channelInactive( ctx );
    }



    private void registerSession(Session session, String clientId){

        //一旦绑定不能取消
        if( clientId == null || session == null ){
            return;
        }

        //绑定session
        session.setAttribute( SESSION_KEY_CLIENTID, clientId );


        //加入到session结合
        ConcurrentMap<Session,Session> sessionMap =  clientIdSessionsMap.get( clientId );
        if( sessionMap == null ){

            sessionMap = new ConcurrentHashMap<Session,Session>();
            ConcurrentMap<Session,Session> oldSessionMap =  clientIdSessionsMap.putIfAbsent( clientId, sessionMap );
            if( oldSessionMap != null ){
                sessionMap = oldSessionMap;
            }
        }

        sessionMap.put( session , session);

        LOGGER.debug( "RPC客户端ID注册: clientId:{}, session:{}", clientId, session );
    }

    private void unregisterSession(Session session){

        if( session == null ){
            return;
        }

        String clientId = (String)session.getAttribute( SESSION_KEY_CLIENTID );

        if( clientId != null ){
            session.removeAttribute( SESSION_KEY_CLIENTID );

            //加入到session结合
            ConcurrentMap<Session,Session> sessionMap =  clientIdSessionsMap.get( clientId );
            if( sessionMap != null ){
                sessionMap.remove( session );
            }

            LOGGER.debug( "RPC客户端ID断开: clientId:{}, session:{}", clientId, session );
        }
    }




}
